package com.niit.flume;

import org.apache.flume.Context;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;

import java.util.HashMap;

/*
自定义接收器
 */
public class MySource extends AbstractSource implements Configurable, PollableSource {


    //定义配置文件将要读取的字段
    private Long delay; //延迟
    private String field;//字段

    //接受数据，封装一个一个event
    @Override
    public Status process() throws EventDeliveryException {
        try{
            //循环封装事件
            for (int i =0;i<5;i++){
                //创建事件头信息
                HashMap<String,String> headerMap = new HashMap<>();
                //创建事件
                SimpleEvent event = new SimpleEvent();
                //给事件设置头信息
                event.setHeaders(headerMap);
                //给事件设置内容
                event.setBody(( field + i).getBytes() );
                //将事件吸入到channel
                getChannelProcessor().processEvent(event);
                //设置延迟
                Thread.sleep(delay);
            }

        }catch (Exception e){
            e.printStackTrace();
            return Status.BACKOFF;
        }

        return Status.READY;
    }

    @Override
    public long getBackOffSleepIncrement() {
        return 0;
    }

    @Override
    public long getMaxBackOffSleepInterval() {
        return 0;
    }

    //读取某个配置文件的配置信息
    @Override
    public void configure(Context context) {
        //读取配置文件中的 dealy
        delay = context.getLong("delay");
        //读取配置文件中的 field ==FLUME  ==>FLUME0 FLUME1 FLUME2
        field = context.getString("field","FLUME");
    }
}
